Extension point streamProcessor
In component org.nuxeo.runtime.stream.service
Contribution Descriptors
- Class: org.nuxeo.runtime.stream.StreamProcessorDescriptor
Existing Contributions
Contributions are presented in the same order as the registration order on this extension point. This order is displayed before the contribution name, in brackets.
-
<extension point="streamProcessor" target="org.nuxeo.runtime.stream.service"> <streamProcessor class="org.nuxeo.runtime.stream.StreamMetricsProcessor" enabled="true" name="metrics"> <!-- To handle a MSK rolling upgrade we need 30min retries --> <policy continueOnFailure="false" delay="15s" maxDelay="120s" maxRetries="18" name="default"/> <stream codec="avro" name="input/null" partitions="1"/> <computation concurrency="1" name="stream/metrics"/> </streamProcessor> </extension>
-
<extension point="streamProcessor" target="org.nuxeo.runtime.stream.service"> <streamProcessor class="org.nuxeo.coldstorage.action.MoveToColdStorageContentAction" defaultConcurrency="2" defaultPartitions="4" name="moveToColdStorage"> <policy continueOnFailure="true" delay="1s" maxDelay="10s" maxRetries="3" name="default"/> </streamProcessor> <streamProcessor class="org.nuxeo.coldstorage.action.PropagateMoveToColdStorageContentAction" defaultConcurrency="2" defaultPartitions="4" name="propagateMoveToColdStorage"> <policy continueOnFailure="true" delay="1s" maxDelay="10s" maxRetries="3" name="default"/> </streamProcessor> <streamProcessor class="org.nuxeo.coldstorage.action.PropagateRestoreFromColdStorageContentAction" defaultConcurrency="2" defaultPartitions="4" name="propagateRestoreFromColdStorage"> <policy continueOnFailure="true" delay="1s" maxDelay="10s" maxRetries="3" name="default"/> </streamProcessor> <streamProcessor class="org.nuxeo.coldstorage.action.CheckColdStorageAvailabilityAction" defaultConcurrency="2" defaultPartitions="4" defaultScroller="default" name="checkColdStorageAvailability"> <policy continueOnFailure="true" delay="1s" maxDelay="10s" maxRetries="3" name="default"/> </streamProcessor> </extension>
-
<extension point="streamProcessor" target="org.nuxeo.runtime.stream.service"> <streamProcessor class="org.nuxeo.ecm.core.security.RetentionExpiredAction" defaultConcurrency="1" defaultPartitions="1" name="retentionExpired"> <!-- continue on failure, because failure to expire retention doesn't give us an inconsistent state --> <policy continueOnFailure="true" delay="1s" maxDelay="60s" maxRetries="20" name="default"/> </streamProcessor> </extension>
-
<extension point="streamProcessor" target="org.nuxeo.runtime.stream.service"> <!-- Migration processor --> <streamProcessor class="org.nuxeo.ecm.core.migrator.AbstractBulkMigrator$MigrationAction" defaultConcurrency="2" defaultPartitions="4" name="migration" start="false"> <policy continueOnFailure="false" delay="500ms" maxDelay="10s" maxRetries="3" name="default"/> </streamProcessor> </extension>
-
<extension point="streamProcessor" target="org.nuxeo.runtime.stream.service"> <streamProcessor class="org.nuxeo.ecm.core.blob.stream.StreamOrphanBlobGC" defaultCodec="avro" defaultConcurrency="1" defaultPartitions="1" enabled="true" name="blobGC"> <policy continueOnFailure="true" delay="3s" maxDelay="60s" maxRetries="3" name="default"/> </streamProcessor> <streamProcessor class="org.nuxeo.ecm.core.model.stream.StreamDocumentGC" defaultCodec="avro" defaultConcurrency="1" defaultPartitions="1" enabled="true" name="documentGC"> <policy continueOnFailure="true" delay="3s" maxDelay="60s" maxRetries="3" name="default"/> </streamProcessor> </extension>
-
<extension point="streamProcessor" target="org.nuxeo.runtime.stream.service"> <streamProcessor class="org.nuxeo.ecm.core.action.DeletionAction" defaultConcurrency="2" defaultPartitions="4" name="deletion"> <policy continueOnFailure="true" delay="1s" maxDelay="10s" maxRetries="3" name="default"/> </streamProcessor> <!-- GarbageCollectOrphanBlobs processor --> <streamProcessor class="org.nuxeo.ecm.core.action.GarbageCollectOrphanBlobsAction" defaultConcurrency="2" defaultPartitions="4" name="garbageCollectOrphanBlobs"> <policy continueOnFailure="false" delay="500ms" maxDelay="10s" maxRetries="3" name="default"/> </streamProcessor> </extension>
-
<extension point="streamProcessor" target="org.nuxeo.runtime.stream.service"> <streamProcessor class="org.nuxeo.ecm.core.action.GarbageCollectOrphanVersionsAction" defaultConcurrency="2" defaultPartitions="4" name="garbageCollectOrphanVersions"> <policy continueOnFailure="false" delay="500ms" maxDelay="10s" maxRetries="3" name="default"/> </streamProcessor> </extension>
-
<extension point="streamProcessor" target="org.nuxeo.runtime.stream.service"> <streamProcessor class="org.nuxeo.drive.action.FireGroupUpdatedEventAction" defaultConcurrency="2" defaultPartitions="4" name="driveFireGroupUpdatedEvent"> <policy continueOnFailure="true" delay="1s" maxDelay="10s" maxRetries="3" name="default"/> </streamProcessor> </extension>
-
<extension point="streamProcessor" target="org.nuxeo.runtime.stream.service"> <!-- CSV exporter processor --> <streamProcessor class="org.nuxeo.ecm.platform.csv.export.action.CSVExportAction" defaultConcurrency="2" defaultPartitions="4" name="csvExport"> <policy continueOnFailure="true" delay="1s" maxDelay="10s" maxRetries="3" name="default"/> <stream name="bulk/makeBlob"> <filter class="org.nuxeo.ecm.core.transientstore.computation.TransientStoreOverflowRecordFilter" name="overflow"> <option name="storeName">default</option> <option name="prefix">csvoverflow</option> <option name="thresholdSize">990000</option> </filter> </stream> <option name="produceImmediate">true</option> </streamProcessor> </extension>
-
<extension point="streamProcessor" target="org.nuxeo.runtime.stream.service"> <streamProcessor class="org.nuxeo.ecm.platform.picture.recompute.RecomputeViewsAction" defaultConcurrency="2" defaultPartitions="6" name="recomputeViews"> <policy continueOnFailure="true" delay="5s" maxDelay="10s" maxRetries="1" name="default"/> </streamProcessor> <streamProcessor class="org.nuxeo.ecm.platform.picture.recompute.RecomputeViewsAction" defaultConcurrency="2" defaultPartitions="12" enabled="false" name="recomputeViewsBackground"> <policy continueOnFailure="true" delay="5s" maxDelay="10s" maxRetries="1" name="default"/> </streamProcessor> </extension>
-
<extension point="streamProcessor" target="org.nuxeo.runtime.stream.service"> <streamProcessor class="org.nuxeo.ecm.platform.video.action.RecomputeVideoConversionsAction" defaultConcurrency="2" defaultPartitions="6" name="recomputeVideoConversions"> <policy continueOnFailure="true" delay="5s" maxDelay="10s" maxRetries="1" name="default"/> </streamProcessor> </extension>
-
<extension point="streamProcessor" target="org.nuxeo.runtime.stream.service"> <streamProcessor class="org.nuxeo.retention.actions.HoldDocumentsAction" defaultConcurrency="2" defaultPartitions="4" name="holdDocumentsAction"> <policy continueOnFailure="true" delay="1s" maxDelay="10s" maxRetries="3" name="default"/> </streamProcessor> <streamProcessor class="org.nuxeo.retention.actions.UnholdDocumentsAction" defaultConcurrency="2" defaultPartitions="4" name="unholdDocumentsAction"> <policy continueOnFailure="true" delay="1s" maxDelay="10s" maxRetries="3" name="default"/> </streamProcessor> <streamProcessor class="org.nuxeo.retention.actions.AttachRetentionRuleAction" defaultConcurrency="2" defaultPartitions="4" name="attachRetentionRule"> <policy continueOnFailure="true" delay="1s" maxDelay="10s" maxRetries="3" name="default"/> </streamProcessor> <streamProcessor class="org.nuxeo.retention.actions.EvalInputEventBasedRuleAction" defaultConcurrency="2" defaultPartitions="4" name="evalInputEventBasedRule"> <policy continueOnFailure="true" delay="1s" maxDelay="10s" maxRetries="3" name="default"/> </streamProcessor> <streamProcessor class="org.nuxeo.retention.actions.ProcessRetentionEventAction" defaultConcurrency="2" defaultPartitions="4" name="processRetentionEvent"> <policy continueOnFailure="true" delay="1s" maxDelay="10s" maxRetries="3" name="default"/> </streamProcessor> </extension>
-
<extension point="streamProcessor" target="org.nuxeo.runtime.stream.service"> <streamProcessor class="org.nuxeo.ecm.platform.routing.core.bulk.DocumentRoutingEscalationAction" defaultConcurrency="2" defaultPartitions="4" name="DocumentRoutingEscalationAction"> <policy continueOnFailure="true" delay="500ms" maxDelay="10s" maxRetries="3" name="default"/> </streamProcessor> </extension>
-
<extension point="streamProcessor" target="org.nuxeo.runtime.stream.service"> <!-- GarbageCollectOrphanRoute processor --> <streamProcessor class="org.nuxeo.ecm.platform.routing.core.bulk.GarbageCollectRoutesAction" defaultConcurrency="2" defaultPartitions="4" name="garbageCollectWokflows"> <policy continueOnFailure="false" delay="500ms" maxDelay="10s" maxRetries="3" name="default"/> </streamProcessor> </extension>
-
<extension point="streamProcessor" target="org.nuxeo.runtime.stream.service"> <streamProcessor class="org.nuxeo.ecm.platform.audit.impl.StreamAuditWriter" defaultCodec="avro" defaultConcurrency="1" defaultPartitions="1" enabled="true" name="auditWriter"> <policy batchCapacity="25" batchThreshold="500ms" continueOnFailure="false" delay="1s" maxDelay="60s" maxRetries="20" name="default"/> </streamProcessor> </extension>
-
<extension point="streamProcessor" target="org.nuxeo.runtime.stream.service"> <streamProcessor class="org.nuxeo.ecm.core.bulk.BulkServiceProcessor" defaultCodec="avro" defaultConcurrency="1" defaultExternal="true" defaultPartitions="1" name="bulkServiceProcessor" start="false"> <stream external="false" name="bulk/command"/> <stream external="false" name="bulk/status"/> <stream external="false" name="bulk/done"/> <policy continueOnFailure="false" delay="1s" maxDelay="60s" maxRetries="0" name="bulk/scroller"/> <policy continueOnFailure="false" delay="1s" maxDelay="60s" maxRetries="20" name="bulk/status"/> <computation concurrency="2" name="bulk/scroller"/> <computation concurrency="1" name="bulk/status"/> </streamProcessor> <streamProcessor class="org.nuxeo.ecm.core.bulk.introspection.StreamIntrospectionProcessor" defaultCodec="avro" defaultConcurrency="1" defaultPartitions="1" enabled="true" name="streamIntrospection"/> </extension>
-
<extension point="streamProcessor" target="org.nuxeo.runtime.stream.service"> <!-- SetProperty processor --> <streamProcessor class="org.nuxeo.ecm.core.bulk.action.SetPropertiesAction" defaultConcurrency="2" defaultPartitions="4" name="setProperties"> <policy continueOnFailure="false" delay="500ms" maxDelay="10s" maxRetries="3" name="default"/> </streamProcessor> <!-- SetSystemProperty processor --> <streamProcessor class="org.nuxeo.ecm.core.bulk.action.SetSystemPropertiesAction" defaultConcurrency="2" defaultPartitions="4" name="setSystemProperties"> <policy continueOnFailure="false" delay="500ms" maxDelay="10s" maxRetries="3" name="default"/> </streamProcessor> <!-- RemoveProxy processor --> <streamProcessor class="org.nuxeo.ecm.core.bulk.action.RemoveProxyAction" defaultConcurrency="2" defaultPartitions="4" name="removeProxy"> <policy continueOnFailure="false" delay="500ms" maxDelay="10s" maxRetries="3" name="default"/> </streamProcessor> <!-- Trash processor --> <streamProcessor class="org.nuxeo.ecm.core.bulk.action.TrashAction" defaultConcurrency="1" defaultPartitions="1" name="trash"> <policy continueOnFailure="false" delay="500ms" maxDelay="10s" maxRetries="3" name="default"/> </streamProcessor> </extension>
-
<extension point="streamProcessor" target="org.nuxeo.runtime.stream.service"> <streamProcessor class="org.nuxeo.ecm.automation.core.operations.services.bulk.AutomationBulkAction" defaultConcurrency="2" defaultPartitions="4" name="automation"> <policy continueOnFailure="true" delay="1s" maxRetries="3" name="default"/> </streamProcessor> <streamProcessor class="org.nuxeo.ecm.automation.core.operations.services.bulk.AutomationBulkActionUi" defaultConcurrency="2" defaultPartitions="4" name="automationUi"> <policy continueOnFailure="true" delay="1s" maxRetries="3" name="default"/> <option name="failOnError">false</option> </streamProcessor> </extension>
-
<extension point="streamProcessor" target="org.nuxeo.runtime.stream.service"> <streamProcessor class="org.nuxeo.ecm.core.storage.action.ExtractBinaryFulltextAction" defaultConcurrency="2" defaultPartitions="4" name="extractBinaryFulltext"> <policy continueOnFailure="true" delay="1s" maxRetries="3" name="default"/> </streamProcessor> </extension>
-
<extension point="streamProcessor" target="org.nuxeo.runtime.stream.service"> <streamProcessor class="org.nuxeo.elasticsearch.bulk.IndexAction" defaultConcurrency="2" defaultPartitions="4" enabled="true" name="indexAction"> <policy continueOnFailure="false" delay="1s" maxDelay="60s" maxRetries="20" name="default"/> <!-- fetch content and build indexing requests --> <computation concurrency="4" name="bulk/index"/> <stream name="bulk/index" partitions="12"/> <!-- submit requests to elastic --> <computation concurrency="2" name="bulk/bulkIndex"/> <stream name="bulk/bulkIndex" partitions="8"> <filter class="org.nuxeo.ecm.core.transientstore.computation.TransientStoreOverflowRecordFilter" name="overflow"> <option name="storeName">default</option> <option name="prefix">index</option> <option name="thresholdSize">990000</option> </filter> </stream> <computation concurrency="1" name="bulk/indexCompletion"/> <!-- optimal size of the elasticsearch bulk request --> <option name="esBulkSizeBytes">5242880</option> <!-- max number of actions in the elasticsearch bulk request --> <option name="esBulkActions">1000</option> <!-- flush elasticsearch bulk request interval --> <option name="flushIntervalSeconds">5</option> </streamProcessor> </extension>
-
<extension point="streamProcessor" target="org.nuxeo.runtime.stream.service"> <streamProcessor class="org.nuxeo.ecm.core.bulk.S3SetBlobLengthAction" defaultConcurrency="2" defaultPartitions="4" enabled="false" name="s3SetBlobLength"> <policy continueOnFailure="true" delay="500ms" maxDelay="10s" maxRetries="3" name="default"/> </streamProcessor> </extension>
-
<extension point="streamProcessor" target="org.nuxeo.runtime.stream.service"> <!-- Update Read ACLs processor --> <streamProcessor class="org.nuxeo.ecm.core.storage.dbs.action.UpdateReadAclsAction" defaultConcurrency="1" defaultPartitions="1" name="updateReadAcls"> <policy continueOnFailure="false" delay="500ms" maxDelay="10s" maxRetries="3" name="default"/> </streamProcessor> </extension>
-
<extension point="streamProcessor" target="org.nuxeo.runtime.stream.service"> <streamProcessor class="org.nuxeo.ecm.platform.thumbnail.action.RecomputeThumbnailsAction" defaultConcurrency="2" defaultPartitions="6" name="recomputeThumbnails"> <policy continueOnFailure="true" delay="5s" maxDelay="10s" maxRetries="1" name="default"/> </streamProcessor> </extension>